{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Decision Tree Classification"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create entry points to spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "sc = SparkContext(master = 'local')\n",
    "\n",
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder \\\n",
    "          .appName(\"Python Spark SQL basic example\") \\\n",
    "          .config(\"spark.some.config.option\", \"some-value\") \\\n",
    "          .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Decision tree classification with pyspark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+---------+---------+---+\n",
      "|age|education|wantsMore|  y|\n",
      "+---+---------+---------+---+\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "|<25|      low|      yes|  0|\n",
      "+---+---------+---------+---+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "cuse = spark.read.csv('data/cuse_binary.csv', header=True, inferSchema=True)\n",
    "cuse.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Process categorical columns\n",
    "The following code does three things with pipeline:\n",
    "\n",
    "* **`StringIndexer`** all categorical columns\n",
    "* **`OneHotEncoder`** all categorical index columns\n",
    "* **`VectorAssembler`** all feature columns into one vector column"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Categorical columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler\n",
    "from pyspark.ml import Pipeline\n",
    "\n",
    "# categorical columns\n",
    "categorical_columns = cuse.columns[0:3]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Build StringIndexer stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical_columns]\n",
    "# encode label column and add it to stringindexer_stages\n",
    "stringindexer_stages += [StringIndexer(inputCol='y', outputCol='label')]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Build OneHotEncoder stages"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical_columns]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Build VectorAssembler stage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "feature_columns = ['onehot_' + c for c in categorical_columns]\n",
    "vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features') "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Build pipeline model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# all stages\n",
    "all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]\n",
    "pipeline = Pipeline(stages=all_stages)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Fit pipeline model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "pipeline_model = pipeline.fit(cuse)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Transform data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "|   onehot_age|onehot_education|onehot_wantsMore|           features|label|\n",
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "|(3,[2],[1.0])|       (1,[],[])|   (1,[0],[1.0])|(5,[2,4],[1.0,1.0])|  0.0|\n",
      "+-------------+----------------+----------------+-------------------+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "final_columns = feature_columns + ['features', 'label']\n",
    "cuse_df = pipeline_model.transform(cuse).\\\n",
    "            select(final_columns)\n",
    "            \n",
    "cuse_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Split data into training and test datasets"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "training, test = cuse_df.randomSplit([0.8, 0.2], seed=1234)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Build cross-validation model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Estimator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.regression import GeneralizedLinearRegression\n",
    "from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier\n",
    "\n",
    "dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Parameter grid"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.tuning import ParamGridBuilder\n",
    "param_grid = ParamGridBuilder().\\\n",
    "    addGrid(dt.maxDepth, [2,3,4,5]).\\\n",
    "    build()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Evaluator"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import BinaryClassificationEvaluator\n",
    "evaluator = BinaryClassificationEvaluator(rawPredictionCol=\"rawPrediction\", metricName=\"areaUnderROC\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Build cross-validation model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.tuning import CrossValidator\n",
    "cv = CrossValidator(estimator=dt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Fit cross-validation mode"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "cv_model = cv.fit(cuse_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Prediction"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "show_columns = ['features', 'label', 'prediction', 'rawPrediction', 'probability']"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Prediction on training data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "|features |label|prediction|rawPrediction|probability                             |\n",
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pred_training_cv = cv_model.transform(training)\n",
    "pred_training_cv.select(show_columns).show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "##### Prediction on test data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "|features |label|prediction|rawPrediction|probability                             |\n",
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "|(5,[],[])|0.0  |1.0       |[203.0,237.0]|[0.46136363636363636,0.5386363636363637]|\n",
      "+---------+-----+----------+-------------+----------------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "pred_test_cv = cv_model.transform(test)\n",
    "pred_test_cv.select(show_columns).show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Confusion matrix\n",
    "\n",
    "Pyspark doesn’t have a function to calculate the confusion matrix automatically, but we can still easily get a confusion matrix with a combination use of several methods from the RDD class."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defaultdict(int,\n",
       "            {Row(label=0.0, prediction=0.0): 897,\n",
       "             Row(label=0.0, prediction=1.0): 203,\n",
       "             Row(label=1.0, prediction=0.0): 270,\n",
       "             Row(label=1.0, prediction=1.0): 237})"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "label_and_pred = cv_model.transform(cuse_df).select('label', 'prediction')\n",
    "label_and_pred.rdd.zipWithIndex().countByKey()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Parameters from the best model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "The best MaxDepth is: 3\n"
     ]
    }
   ],
   "source": [
    "print('The best MaxDepth is:', cv_model.bestModel._java_obj.getMaxDepth())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "# Decision tree classification with R"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load R libraries"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "library(rpart)\n",
    "library(caret)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "cuse_binary = read.csv('data/cuse_binary.csv', header = TRUE)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Encode categorical variables"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "cuse_binary$age = factor(cuse_binary$age, \n",
    "                         levels = names(sort(table(cuse_binary$age), decreasing = TRUE)))\n",
    "cuse_binary$education = factor(cuse_binary$education,\n",
    "                               levels = names(sort(table(cuse_binary$education), decreasing = TRUE)))\n",
    "cuse_binary$wantsMore = factor(cuse_binary$wantsMore,\n",
    "                               levels = names(sort(table(cuse_binary$wantsMore), decreasing = TRUE)))\n",
    "\n",
    "\n",
    "cuse_binary$y = factor(cuse_binary$y,\n",
    "                               levels = names(sort(table(cuse_binary$y))))\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Fit decision tree model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "dt_fit = rpart(y ~ age + education + wantsMore, \n",
    "               data = cuse_binary, method = 'class')\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Confusion matrix"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "pred_y = predict(dt_fit, type = 'class')\n",
    "confusionMatrix(data = pred_y, reference = cuse_binary$y)\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---\n",
    "### Result"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "```\n",
    "Confusion Matrix and Statistics\n",
    "\n",
    "          Reference\n",
    "Prediction   1   0\n",
    "         1 237 203\n",
    "         0 270 897\n",
    "                                          \n",
    "               Accuracy : 0.7057          \n",
    "                 95% CI : (0.6827, 0.7279)\n",
    "    No Information Rate : 0.6845          \n",
    "    P-Value [Acc > NIR] : 0.035460        \n",
    "                                          \n",
    "                  Kappa : 0.2934          \n",
    " Mcnemar's Test P-Value : 0.002408        \n",
    "                                          \n",
    "            Sensitivity : 0.4675          \n",
    "            Specificity : 0.8155          \n",
    "         Pos Pred Value : 0.5386          \n",
    "         Neg Pred Value : 0.7686          \n",
    "             Prevalence : 0.3155          \n",
    "         Detection Rate : 0.1475          \n",
    "   Detection Prevalence : 0.2738          \n",
    "      Balanced Accuracy : 0.6415          \n",
    "                                          \n",
    "       'Positive' Class : 1  \n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
